package com.migrate.module.task;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.migrate.module.domain.EtlProgress;
import com.migrate.module.domain.RangeScroll;
import com.migrate.module.enumeration.EtlProgressStatus;
import com.migrate.module.enumeration.ProgressType;
import com.migrate.module.mapper.migrate.MigrateScrollMapper;
import com.migrate.module.migrate.ScrollProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 定时尝试全量同步失败的数据重试
 *
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class ScrollTask {


    @Resource
    private MigrateScrollMapper migrateScrollMapper;
    @Resource
    private ScrollProcessor scrollProcessor;

    private static int STOP_TIME_LIMITING_VALUE = 600000;

    /**
     *  补偿全量同步过程中，出现的失败中断的流程
     */
    @Scheduled(fixedDelay = 60000)
    public void Scroll(){
        List<EtlProgress> progressList = queryFailAndStopProgressList();
        for (EtlProgress progress:progressList){
            if (progress.getRetryTimes() < 3){
                RangeScroll rangeScroll = new RangeScroll();
                rangeScroll.setId(progress.getId());
                rangeScroll.setStartScrollId(progress.getScrollId());
                rangeScroll.setTableName(progress.getLogicModel());
                rangeScroll.setDomain(progress.getDomain());
                rangeScroll.setPageSize(progress.getFinishRecord());
                rangeScroll.setStartTime(progress.getScrollTime());
                rangeScroll.setEndTime(progress.getScrollEndTime());
                rangeScroll.setCurTicketStage(progress.getCurTicketStage());
                rangeScroll.setTicket(progress.getTicket());
                rangeScroll.setRetryFlag(true);
                scrollProcessor.scroll(rangeScroll);
            }
        }
    }

    /**
     * 查询同步失败和 同步过程中任务异常停止的任务（未完成状态）
     * @return
     */
    public List<EtlProgress> queryFailAndStopProgressList(){
        EtlProgress etlFailProgress = new EtlProgress();
        etlFailProgress.setStatus(EtlProgressStatus.FAIL.getValue());
        etlFailProgress.setProgressType(ProgressType.RANGE_SCROLL.getValue());
        // 查询范围滚动的过程中，失败的数据
        List<EtlProgress> failProgressList = migrateScrollMapper.queryEtlProgressList(etlFailProgress);
        if (CollectionUtils.isNotEmpty(failProgressList)){
            return failProgressList;
        }

        EtlProgress etlStopProgress = new EtlProgress();
        etlStopProgress.setStatus(EtlProgressStatus.INIT.getValue());
        etlStopProgress.setProgressType(ProgressType.RANGE_SCROLL.getValue());
        List<EtlProgress> stopProgressList = migrateScrollMapper.queryEtlProgressList(etlStopProgress);
        log.info("检测到未完成任务数量：{}",stopProgressList.size());
        if (CollectionUtils.isNotEmpty(stopProgressList)){
            long nowTime = System.currentTimeMillis();
            //查询正在进行中，但是更新时间不在10分钟之内的迁移数据，说明服务异常停止了，继续开始同步
            stopProgressList = stopProgressList.stream().filter(etlProgress -> {
                if (nowTime - etlProgress.getUpdateTime().getTime() > STOP_TIME_LIMITING_VALUE){
                    return true;
                }
                return false;
            }).collect(Collectors.toList());
            return stopProgressList;
        } else {
            return new ArrayList<EtlProgress>();
        }
    }
}
